package cn.doitedu.dao;

import cn.doitedu.cache.CacheManager;
import cn.doitedu.cache.CacheManagerRedisImpl;
import cn.doitedu.pojo.CombineCondition;
import cn.doitedu.pojo.EventCondition;
import cn.doitedu.pojo.EventParam;
import cn.doitedu.pojo.LogBean;
import cn.doitedu.utils.ConnectionUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import sun.misc.Cache;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class HistoryEventClickHouseDaoImpl implements HistoryEventDao {

    private Connection clickHouseConn;

    private CacheManager cacheManager;

    public void init(ParameterTool parameterTool) throws Exception {
        clickHouseConn = ConnectionUtils.getClickHouseConn(parameterTool);

        cacheManager = new CacheManagerRedisImpl(parameterTool);
    }

    @Override
    public void close() throws Exception {
        clickHouseConn.close();
    }

    @Override
    public String queryEventSequenceStr(LogBean bean, CombineCondition combineCondition) throws Exception {



        //在查询ClickHouse前，检查缓存是否可用？
        String deviceId = bean.getDeviceId();
        String cacheId = combineCondition.getCacheId();
        String bigKey = deviceId + ":" + cacheId;
        long startTime = combineCondition.getStartTime();
        long endTime = combineCondition.getEndTime();
        String smallKey = startTime + ":" + endTime;


        //可以使用缓存的三种情况
        //①直接命中
        //实际缓存的数据 |t4----t8|
        //希望查询的数据 |t4----t8|
        String cacheData = cacheManager.getData(bigKey, smallKey);
        if (cacheData != null) return cacheData;

        //②部分命中
        //实际缓存的数据 |t4----t8|
        //希望查询的数据 |t4----t8|t9--t10|
        Set<String> keys = cacheManager.getTimeRanges(deviceId);
        for (String key : keys) {
            String[] fields = key.split(":");
            long start = Long.parseLong(fields[0]);
            long end = Long.parseLong(fields[1]);
            if (startTime == start && endTime > end) {
                cacheData = cacheManager.getData(bigKey, key);
                String seqStr = queryEventSequenceStrByTimeRange(deviceId, combineCondition, end, endTime);
                //将数据存储到缓存中
                cacheManager.setValueEx(bigKey, startTime+ ":" +endTime, cacheData + seqStr, 3600*12);
                return cacheData + seqStr;
            }
        }

        //③部分命中
        //实际缓存的数据         |t4----t8|
        //希望查询的数据 |t1-----|t4----t8|
        keys = cacheManager.getTimeRanges(deviceId);
        for (String key : keys) {
            String[] fields = key.split(":");
            long start = Long.parseLong(fields[0]);
            long end = Long.parseLong(fields[1]);
            if (startTime < start && endTime == end) {
                cacheData = cacheManager.getData(bigKey, key);
                String seqStr = queryEventSequenceStrByTimeRange(deviceId, combineCondition, startTime, start);
                //将数据存储到缓存中
                cacheManager.setValueEx(bigKey, startTime+ ":" +endTime, seqStr + cacheData, 3600*12);
                return seqStr + cacheData;
            }
        }

       return queryEventSequenceStrByTimeRange(deviceId, combineCondition, startTime, endTime);
    }

    private String queryEventSequenceStrByTimeRange(String deviceId, CombineCondition combineCondition, long startTime, long endTime) throws Exception {

        String sql = combineCondition.getSql();
        //获取规则中事先定义好的事件（关心的事件）
        List<String> targetEvents = combineCondition.getEventConditions().stream().map(EventCondition::getEventId).collect(Collectors.toList());

        PreparedStatement preparedStatement = clickHouseConn.prepareStatement(sql);
        preparedStatement.setString(1, deviceId);
        preparedStatement.setLong(2, startTime);
        preparedStatement.setLong(3, endTime);
        ResultSet resultSet = preparedStatement.executeQuery();
        StringBuffer sb = new StringBuffer();
        if (resultSet.next()) {
            //将行为字符串映射成数字或字母
            String eventId = resultSet.getString(1);
            sb.append(targetEvents.indexOf(eventId) + 1);
        }
        return sb.toString();


    }

}
